// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//
#pragma once

#include <pollux/exec/aggregate.h>
#include <pollux/exec/aggregate_info.h>
#include <pollux/exec/aggregation_masks.h>
#include <pollux/exec/distinct_aggregations.h>
#include <pollux/exec/operator.h>
#include <pollux/exec/sorted_aggregations.h>

namespace kumo::pollux::exec {
    class RowContainer;

    class StreamingAggregation : public Operator {
    public:
        StreamingAggregation(
            int32_t operatorId,
            DriverCtx *driverCtx,
            const std::shared_ptr<const core::AggregationNode> &aggregationNode);

        void initialize() override;

        void addInput(RowVectorPtr input) override;

        RowVectorPtr getOutput() override;

        bool needsInput() const override {
            return true;
        }

        BlockingReason isBlocked(ContinueFuture * /* unused */) override {
            return BlockingReason::kNotBlocked;
        }

        bool isFinished() override;

        void close() override;

    private:
        // Returns the rows to aggregate with masking applied if applicable.
        const SelectivityVector &getSelectivityVector(size_t aggregateIndex) const;

        // Allocate new group or re-use previously allocated group that has been fully
        // calculated and included in the output.
        char *startNewGroup(vector_size_t index);

        // Write grouping keys from the specified input row into specified group.
        void storeKeys(char *group, vector_size_t index);

        // Populate output_ vector using specified number of groups from the beginning
        // of the groups_ vector.
        RowVectorPtr createOutput(size_t numGroups);

        // Assign input rows to groups based on values of the grouping keys. Store the
        // assignments in inputGroups_.
        void assignGroups();

        // Add input data to accumulators.
        void evaluateAggregates();

        // Initialize the new groups calculated through current and previous groups.
        void initializeNewGroups(size_t numPrevGroups);

        // Create accumulators and RowContainer for aggregations.
        std::unique_ptr<RowContainer> makeRowContainer(
            const std::vector<TypePtr> &groupingKeyTypes);

        // Initialize the aggregations setting allocator and offsets.
        void initializeAggregates(uint32_t numKeys);

        /// Maximum number of rows in the output batch.
        const vector_size_t outputBatchSize_;

        // Used at initialize() and gets reset() afterward.
        std::shared_ptr<const core::AggregationNode> aggregationNode_;

        const core::AggregationNode::Step step_;

        std::vector<column_index_t> groupingKeys_;
        std::vector<AggregateInfo> aggregates_;
        std::unique_ptr<SortedAggregations> sortedAggregations_;
        std::vector<std::unique_ptr<DistinctAggregations> > distinctAggregations_;
        std::unique_ptr<AggregationMasks> masks_;
        std::vector<DecodedVector> decodedKeys_;

        // Storage of grouping keys and accumulators.
        std::unique_ptr<RowContainer> rows_;

        // Previous input vector. Used to compare grouping keys for groups which span
        // batches.
        RowVectorPtr prevInput_;

        // Unique groups.
        std::vector<char *> groups_;

        // Number of active entries at the beginning of the groups_ vector. The
        // remaining entries are re-usable.
        size_t numGroups_{0};

        // Reusable memory.

        // Pointers to groups for all input rows.
        std::vector<char *> inputGroups_;

        // A subset of input rows to evaluate the aggregate function on. Rows
        // where aggregation mask is false are excluded.
        SelectivityVector inputRows_;
    };
} // namespace kumo::pollux::exec
